---
title: Streaming
description: Process agent responses, tool calls and results in real-time through streaming or handlers.
icon: bars-staggered
---

import { VersionBadge } from '/snippets/version-badge.mdx'


ControlFlow provides two ways to process events during task execution:
- [**Streaming**](#streaming): Iterate over events in real-time using a Python iterator 
- [**Handlers**](#handlers): Register callback functions that are called for each event

Both approaches give you access to the same events - which one you choose depends on how you want to integrate with your application.

## Streaming

<VersionBadge version="0.12.0" />

When you enable streaming, task execution returns an iterator that yields events as they occur. Each iteration provides a tuple of (event, snapshot, delta) representing what just happened in the workflow:

```python
import controlflow as cf

for event, snapshot, delta in cf.run(
    "Write a poem about AI",
    stream=True,
):
    # For complete events, snapshot contains the full content
    if event.event == "agent-content":
        print(f"Agent wrote: {snapshot}")
    
    # For delta events, delta contains just what's new
    elif event.event == "agent-content-delta":
        print(delta, end="", flush=True)
```

You can focus on specific events using the `Stream` enum. Here, we return only content updates:

```python
import controlflow as cf

# Only stream content updates
for event, snapshot, delta in cf.run(
    "Write a poem",
    stream=cf.Stream.CONTENT
):
    print(delta if delta else snapshot)
```

The available stream filters are:
- `Stream.ALL`: All events (equivalent to `stream=True`) 
- `Stream.CONTENT`: Agent content and content deltas
- `Stream.TOOLS`: All tool events
- `Stream.COMPLETION_TOOLS`: Completion tool events (like marking a task successful or failed)
- `Stream.AGENT_TOOLS`: Tools used by agents for any purpose other than completing a task
- `Stream.TASK_EVENTS`: Task lifecycle events (starting, completion, failure, etc)

You can combine filters with the `|` operator:

```python
# Stream content and tool events
stream = Stream.CONTENT | Stream.TOOLS
```

For more complex filtering, set stream=True and filter the events manually, or use a handler.

## Handlers
<VersionBadge version="0.9.2" />

For more complex event processing, or when you want to decouple event handling from your main workflow, use handlers:

```python 
from controlflow.orchestration.handler import Handler
from controlflow.events.events import AgentMessage

class LoggingHandler(Handler):
    def on_agent_message(self, event: AgentMessage):
        print(f"Agent {event.agent.name} said: {event.message['content']}")

    def on_tool_result(self, event: ToolResult):
        print(f"Tool call result: {event.tool_result.str_result}")

# Use the handler
cf.run("Write a poem", handlers=[LoggingHandler()])
```

Handlers are especially useful for:
- Adding logging or monitoring
- Collecting metrics
- Updating UI elements
- Processing events asynchronously

Handlers call their `on_<event-name>` methods for each event type. For a complete list of available methods, see the [Event Details](#event-details) section below.


### Async Handlers

<VersionBadge version="0.11.1" />

For asynchronous event processing, use `AsyncHandler`:

```python
import asyncio
from controlflow.orchestration.handler import AsyncHandler

class AsyncLoggingHandler(AsyncHandler):
    async def on_agent_message(self, event: AgentMessage):
        await asyncio.sleep(0.1)  # Simulate async operation
        print(f"Agent {event.agent.name} said: {event.message['content']}")

await cf.run_async("Write a poem", handlers=[AsyncLoggingHandler()])
```

## Example: Real-time Content Display

Here's a complete example showing both approaches to display content in real-time:

<CodeGroup>
```python Streaming
import controlflow as cf

for event, snapshot, delta in cf.run(
    "Write a story about time travel",
    stream=cf.Stream.CONTENT
):
    # Print character by character
    if delta:
        print(delta, end="", flush=True)
```

```python Handler
import controlflow as cf
from controlflow.orchestration.handler import Handler

class ContentHandler(Handler):
    def on_agent_content_delta(self, event):
        # Print character by character
        print(event.content_delta, end="", flush=True)
        
cf.run(
    "Write a story about time travel",
    handlers=[ContentHandler()]
)
```
</CodeGroup>

## Event Details

Now that we've seen how to process events, let's look at the types of events you can receive:

### Content Events
Content events give you access to what an agent is saying or writing:

```python
# Complete content
{
    "event": "agent-content",
    "agent": agent,  # Agent object
    "content": "Hello, world!", # The complete content
    "agent_message_id": "msg_123"  # Optional ID linking to parent message
}

# Content delta (incremental update)
{
    "event": "agent-content-delta",
    "agent": agent,
    "content_delta": "Hello",  # New content since last update
    "content_snapshot": "Hello, world!",  # Complete content so far
    "agent_message_id": "msg_123"
}
```

### Tool Events
Tool events let you observe when agents use tools and get their results:

```python
# Tool being called
{
    "event": "agent-tool-call",
    "agent": agent,
    "tool_call": {...},  # The complete tool call info
    "tool": tool,  # The Tool object being called
    "args": {...},  # Arguments passed to the tool
    "agent_message_id": "msg_123"
}

# Tool call delta (incremental update)
{
    "event": "agent-tool-call-delta",
    "agent": agent,
    "tool_call_delta": {...},  # Changes to the tool call
    "tool_call_snapshot": {...},  # Complete tool call info so far
    "tool": tool,
    "args": {...},
    "agent_message_id": "msg_123"
}

# Tool result
{
    "event": "tool-result",
    "agent": agent,
    "tool_result": {
        "tool_call": {...},  # The original tool call
        "tool": tool,  # The Tool object that was called
        "result": any,  # The raw result value
        "str_result": "...",  # String representation of result
        "is_error": False  # Whether the tool call failed
    }
}
```

### Workflow Events
### Task Events
Events that mark key points in a task's lifecycle:
- `TaskStart`: A task has begun execution
- `TaskSuccess`: A task completed successfully (includes the final result)
- `TaskFailure`: A task failed (includes the error reason)
- `TaskSkipped`: A task was skipped

### Orchestration Events
Events related to orchestrating the overall workflow:
- `OrchestratorStart`/`End`: Workflow orchestration starting/ending
- `AgentTurnStart`/`End`: An agent's turn starting/ending
- `OrchestratorError`: An error occurred during orchestration

### Handler Methods

Each handler can implement methods for different types of events. The method will be called whenever that type of event occurs. Here are all available handler methods:

| Method | Event Type | Description |
|--------|------------|-------------|
| `on_event(event)` | Any | Called for every event, before any specific handler |
| `on_agent_message(event)` | AgentMessage | Raw LLM output containing both content and tool calls |
| `on_agent_message_delta(event)` | AgentMessageDelta | Incremental updates to raw LLM output |
| `on_agent_content(event)` | AgentContent | Unstructured text output from an agent |
| `on_agent_content_delta(event)` | AgentContentDelta | Incremental updates to agent content |
| `on_agent_tool_call(event)` | AgentToolCall | Tool being called by an agent |
| `on_agent_tool_call_delta(event)` | AgentToolCallDelta | Incremental updates to a tool call |
| `on_tool_result(event)` | ToolResult | Result returned from a tool |
| `on_orchestrator_start(event)` | OrchestratorStart | Workflow orchestration starting |
| `on_orchestrator_end(event)` | OrchestratorEnd | Workflow orchestration completed |
| `on_agent_turn_start(event)` | AgentTurnStart | An agent beginning their turn |
| `on_agent_turn_end(event)` | AgentTurnEnd | An agent completing their turn |
| `on_orchestrator_error(event)` | OrchestratorError | Error during orchestration |

Note that AgentMessage is the "raw" output from the LLM and contains both unstructured content and structured tool calls. When you receive an AgentMessage, you will also receive separate AgentContent and/or AgentToolCall events for any content or tool calls contained in that message. This allows you to:
1. Handle all LLM output in one place with `on_agent_message`
2. Handle just content with `on_agent_content` 
3. Handle just tool calls with `on_agent_tool_call`

For streaming cases, the delta events (e.g. AgentMessageDelta, AgentContentDelta) provide incremental updates as the LLM generates its response. Task events, in contrast, are complete events that mark important points in a task's lifecycle - you can use these to track progress and get results without managing the task object directly..
